package com.atguigu.chapter07.b_watermarket;

import com.atguigu.chapter5.source.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;

/**
 * @ClassName: Flink01_Tumbling_Window
 * @Description:
 * @Author: kele
 * @Date: 2021/4/6 20:06
 *
 * 在无序的水印中，只要之前的串口不关闭，数据就可以进去
 *
 **/
public class Flink09_WaterMarket_UnOrder {

    public static void main(String[] args) {

        Configuration conf = new Configuration();
        conf.setInteger("rest.port",20000);

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        env.socketTextStream("hadoop162",8888)
                .map(datas->{

                    String[] data = datas.split(",");
                    return new WaterSensor(data[0],
                            Long.parseLong(data[1]) * 1000,
                            Integer.valueOf(data[2]));
                })
                //设置分发水印
                .assignTimestampsAndWatermarks(
//                        设置无序流的水印格式，方法泛型设置方式
                        WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))

                                //指明需要将哪个数据作为水印，必须是毫秒值，long类型
                                .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
                            @Override
                            public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                                return element.getTs();
                            }
                        })
                )
                .keyBy(WaterSensor::getId)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
                    @Override
                    public void process(String key,
                                        Context context,
                                        Iterable<WaterSensor> elements,
                                        Collector<String> out) throws Exception {

                        int count = 0;
                        for (WaterSensor sensor : elements) {
                            count++;
                        }

                        String o = "key:"+key+
                                    ",["+context.window().getStart()/1000+
                                    ","+context.window().getEnd()/1000+")"+
                                    ",数据个数："+count;

                        out.collect(o);
                    }
                })
                .print();



        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }


    }

}
